VARMAX (VARMA with eXogenous inputs)#

Goals#

  • Precisely define VARX and VARMAX, and how they extend VAR/VARMA.

  • Build intuition for exogenous inputs as known drivers / control signals.

  • Understand complexity and identifiability pitfalls (exogeneity, collinearity, MA issues).

  • Implement simulation + VARX fitting + forecasting + shock propagation in NumPy.

  • Visualize multivariate forecasts and shock propagation with Plotly.

Prerequisites#

  • VAR/VARMA basics (previous folder)

  • Linear regression / OLS

1) Definitions: VARX vs VARMAX#

Let (\mathbf{y}_t\in\mathbb{R}^k) be the target vector and (\mathbf{x}_t\in\mathbb{R}^m) be a vector of exogenous regressors (known inputs).

VARX(p,r)#

A VARX (sometimes written VAR(p)+X) extends VAR with exogenous lags:

\[ \mathbf{y}_t = \mathbf{c} + \sum_{i=1}^{p} A_i\,\mathbf{y}_{t-i} + \sum_{\ell=0}^{r} B_{\ell}\,\mathbf{x}_{t-\ell} + \boldsymbol{\varepsilon}_t. \]

VARMAX(p,q,r)#

A VARMAX is the full ARMA + X model:

\[ \mathbf{y}_t = \mathbf{c} + \sum_{i=1}^{p} A_i\,\mathbf{y}_{t-i} + \sum_{\ell=0}^{r} B_{\ell}\,\mathbf{x}_{t-\ell} + \boldsymbol{\varepsilon}_t + \sum_{j=1}^{q} M_j\,\boldsymbol{\varepsilon}_{t-j}. \]

Transfer-function (system) view#

Using lag polynomials

\[ A(L)= I - A_1 L - \cdots - A_p L^p,\quad B(L)= B_0 + B_1 L + \cdots + B_r L^r,\quad M(L)= I + M_1 L + \cdots + M_q L^q, \]

the model becomes

\[ A(L)\,\mathbf{y}_t = \mathbf{c} + B(L)\,\mathbf{x}_t + M(L)\,\boldsymbol{\varepsilon}_t. \]

So VARMAX is a multi-input multi-output (MIMO) linear system:

  • (A(L)^{-1}B(L)): response of outputs to inputs

  • (A(L)^{-1}M(L)): response of outputs to shocks

2) What “exogenous” really requires#

For OLS-style estimation of VARX, a common assumption is strict exogeneity:

\[\mathbb{E}[\boldsymbol{\varepsilon}_t\mid \mathbf{x}_s,\ s\le T] = 0.\]

If (\mathbf{x}_t) is correlated with (\varepsilon_t) (simultaneity, omitted variables, feedback), then naive OLS is biased.

Practical examples of valid-ish (\mathbf{x}_t):

  • calendar effects / holidays

  • known interventions (pricing change, policy shock)

  • lagged covariates measured without error

Risky (\mathbf{x}_t): contemporaneous variables influenced by (\mathbf{y}_t) itself.

3) Complexity + identifiability pitfalls#

Parameter count (ignoring (\Sigma)) for VARMAX(p,q,r):

\[k \; + \; k^2(p+q) \; + \; k m (r+1).\]

Pitfalls:

  • Collinearity: (\mathbf{x}{t-\ell}) can be highly correlated with each other and with (\mathbf{y}{t-i}).

  • Exogeneity violations: if inputs are not exogenous, estimates can be biased.

  • MA identifiability: all VARMA issues still apply (invertibility, cancellation, non-unique parameterizations).

Tradeoff intuition:

  • Adding (\mathbf{x}_t) can reduce unexplained variance and improve interpretability.

  • Adding an MA part can fix autocorrelated residuals, but estimation becomes substantially harder.

4) Intuition with abstract systems#

Think of VARMAX as a linear dynamical system with:

  • State feedback: (A_i) couples past outputs into the present.

  • Control / forcing inputs: (B_\ell) injects known signals (\mathbf{x}) (with possible delays).

  • Disturbances: (\varepsilon_t) are random shocks.

  • Shock filter: (M_j) lets shocks persist briefly (colored noise).

Two useful “shock propagation” objects:

  1. Innovation impulse response (\Psi_h): effect of a one-time shock in (\varepsilon_t) on future (\mathbf{y}).

  2. Dynamic multipliers (\Theta_h): effect of a one-time pulse in (\mathbf{x}_t) on future (\mathbf{y}).

When (q=0) (VARX), (\Theta_h) satisfies a simple recursion (see code).

import sys

import numpy as np
import plotly
import plotly.graph_objects as go
import os
import plotly.io as pio
from plotly.subplots import make_subplots

pio.templates.default = "plotly_white"
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")

np.set_printoptions(precision=4, suppress=True)

SEED = 7
rng = np.random.default_rng(SEED)

print("Python:", sys.version.split()[0])
print("NumPy:", np.__version__)
print("Plotly:", plotly.__version__)
Python: 3.12.9
NumPy: 1.26.2
Plotly: 6.5.2
def simulate_varmax(
    x: np.ndarray,
    A_list: list[np.ndarray],
    B_list: list[np.ndarray],
    M_list: list[np.ndarray] | None = None,
    c: np.ndarray | None = None,
    Sigma: np.ndarray | None = None,
    burnin: int = 200,
    seed: int = 0,
) -> tuple[np.ndarray, np.ndarray]:
    """Simulate VARMAX(p,q,r) with provided exogenous series x.

    y_t = c + sum_i A_i y_{t-i} + sum_l B_l x_{t-l} + e_t + sum_j M_j e_{t-j}

    Args:
      x: (T, m) exogenous inputs. Returned y will have length (T - burnin - max_lag).
      B_list: [B0, B1, ..., Br] where B_l is (k,m)
      M_list: [M1, ..., Mq] where M_j is (k,k)

    Returns:
      y: (n, k)
      e: (n, k)
    """

    rng_local = np.random.default_rng(seed)

    x = np.asarray(x, dtype=float)
    if x.ndim != 2:
        raise ValueError("x must be (T,m)")
    T_x, m = x.shape

    p = len(A_list)
    r = len(B_list) - 1
    q = 0 if (M_list is None) else len(M_list)
    M_list = [] if M_list is None else M_list

    if p == 0 and r < 0 and q == 0:
        raise ValueError("Need at least one of p, r, q")

    k = (A_list[0].shape[0] if p > 0 else B_list[0].shape[0])

    for A in A_list:
        if A.shape != (k, k):
            raise ValueError("All A_i must be (k,k)")
    for B in B_list:
        if B.shape != (k, m):
            raise ValueError("All B_l must be (k,m)")
    for M in M_list:
        if M.shape != (k, k):
            raise ValueError("All M_j must be (k,k)")

    if c is None:
        c = np.zeros(k)
    c = np.asarray(c, dtype=float)
    if c.shape != (k,):
        raise ValueError("c must be (k,)")

    if Sigma is None:
        Sigma = np.eye(k)
    Sigma = np.asarray(Sigma, dtype=float)
    if Sigma.shape != (k, k):
        raise ValueError("Sigma must be (k,k)")

    max_lag = max(p, r, q)
    if T_x <= burnin + max_lag:
        raise ValueError("x is too short for chosen lags + burnin")

    L = np.linalg.cholesky(Sigma)
    e = rng_local.standard_normal((T_x, k)) @ L.T
    y = np.zeros((T_x, k), dtype=float)

    for t in range(max_lag, T_x):
        yt = c.copy()

        for i, A in enumerate(A_list, start=1):
            yt += A @ y[t - i]

        for ell, B in enumerate(B_list):
            yt += B @ x[t - ell]

        yt += e[t]

        for j, M in enumerate(M_list, start=1):
            yt += M @ e[t - j]

        y[t] = yt

    sl = slice(burnin + max_lag, T_x)
    return y[sl], e[sl]


def varx_ols_fit(
    y: np.ndarray,
    x: np.ndarray,
    p: int,
    r: int,
    add_intercept: bool = True,
) -> dict:
    """Fit VARX(p,r) by stacked OLS.

    y_t = c + sum_i A_i y_{t-i} + sum_{ell=0}^r B_ell x_{t-ell} + e_t

    Returns c, A_list, B_list, residuals, Sigma_hat.
    """

    y = np.asarray(y, dtype=float)
    x = np.asarray(x, dtype=float)
    if y.ndim != 2 or x.ndim != 2:
        raise ValueError("y and x must be 2D")

    T, k = y.shape
    Tx, m = x.shape
    if Tx != T:
        raise ValueError("x and y must have the same length")

    max_lag = max(p, r)
    if T <= max_lag:
        raise ValueError("Need T > max(p,r)")

    Y = y[max_lag:]

    X_blocks = []
    if add_intercept:
        X_blocks.append(np.ones((T - max_lag, 1)))

    for lag in range(1, p + 1):
        X_blocks.append(y[max_lag - lag : T - lag])

    for lag in range(0, r + 1):
        X_blocks.append(x[max_lag - lag : T - lag])

    X = np.concatenate(X_blocks, axis=1)
    B_hat, *_ = np.linalg.lstsq(X, Y, rcond=None)

    Y_hat = X @ B_hat
    E = Y - Y_hat

    offset = 0
    c_hat = np.zeros(k)
    if add_intercept:
        c_hat = B_hat[0]
        offset = 1

    A_hat = []
    for i in range(p):
        A_hat.append(B_hat[offset + i * k : offset + (i + 1) * k].T)

    offset2 = offset + p * k
    B_list = []
    for ell in range(r + 1):
        block = B_hat[offset2 + ell * m : offset2 + (ell + 1) * m].T
        B_list.append(block)

    return {
        'c': c_hat,
        'A': A_hat,
        'B': B_list,
        'resid': E,
        'Sigma': (E.T @ E) / (T - max_lag),
        'X': X,
        'Y': Y,
    }


def varx_forecast_paths(
    y_hist: np.ndarray,
    x_hist: np.ndarray,
    x_future: np.ndarray,
    c: np.ndarray,
    A_list: list[np.ndarray],
    B_list: list[np.ndarray],
    Sigma: np.ndarray,
    n_paths: int = 500,
    seed: int = 0,
) -> np.ndarray:
    """Monte Carlo forecasts for VARX with known future x.

    Returns paths: (n_paths, H, k)
    """

    rng_local = np.random.default_rng(seed)

    y_hist = np.asarray(y_hist, dtype=float)
    x_hist = np.asarray(x_hist, dtype=float)
    x_future = np.asarray(x_future, dtype=float)

    T, k = y_hist.shape
    Tx, m = x_hist.shape
    if Tx != T:
        raise ValueError("x_hist and y_hist must match")

    H, m2 = x_future.shape
    if m2 != m:
        raise ValueError("x_future must have same m as x_hist")

    p = len(A_list)
    r = len(B_list) - 1
    max_lag = max(p, r)
    if T < max_lag:
        raise ValueError("Need enough history")

    L = np.linalg.cholesky(Sigma)

    # stitch x into one array for lag lookup
    x_ext = np.vstack([x_hist, x_future])

    paths = np.zeros((n_paths, H, k), dtype=float)
    for s in range(n_paths):
        y_ext = np.zeros((T + H, k), dtype=float)
        y_ext[:T] = y_hist

        e_fut = rng_local.standard_normal((H, k)) @ L.T

        for t in range(T, T + H):
            yt = c.copy()

            for i, A in enumerate(A_list, start=1):
                yt += A @ y_ext[t - i]

            for ell, B in enumerate(B_list):
                yt += B @ x_ext[t - ell]

            yt += e_fut[t - T]

            y_ext[t] = yt

        paths[s] = y_ext[T:]

    return paths


def varx_dynamic_multiplier(A_list: list[np.ndarray], B_list: list[np.ndarray], steps: int) -> np.ndarray:
    """Theta_0..Theta_steps where y_{t+h} response to a unit pulse in x_t.

    For VARX: y_t = sum_i A_i y_{t-i} + sum_ell B_ell x_{t-ell}.

    Recursion:
      Theta_h = B_h + sum_{i=1}^{min(p,h)} A_i Theta_{h-i}
      with B_h = 0 for h>r.

    Returns: (steps+1, k, m)
    """

    p = len(A_list)
    r = len(B_list) - 1
    k, m = B_list[0].shape

    Theta = np.zeros((steps + 1, k, m))

    for h in range(0, steps + 1):
        Bh = B_list[h] if h <= r else np.zeros((k, m))
        acc = Bh.copy()
        for i in range(1, min(p, h) + 1):
            acc += A_list[i - 1] @ Theta[h - i]
        Theta[h] = acc

    return Theta



def varma_irf(A_list: list[np.ndarray], M_list: list[np.ndarray], steps: int) -> np.ndarray:
    """Impulse response matrices Psi_0..Psi_steps for VARMA(p,q)."""

    p = len(A_list)
    q = len(M_list)
    k = (A_list[0].shape[0] if p > 0 else M_list[0].shape[0])

    Psi = np.zeros((steps + 1, k, k))
    Psi[0] = np.eye(k)

    for h in range(1, steps + 1):
        Mh = M_list[h - 1] if (1 <= h <= q) else np.zeros((k, k))
        acc = Mh.copy()
        for i in range(1, min(p, h) + 1):
            acc += A_list[i - 1] @ Psi[h - i]
        Psi[h] = acc

    return Psi

5) Synthetic example: VARMAX dynamics (simulate) + VARX estimation baseline#

We simulate a 2D output (\mathbf{y}_t) driven by:

  • its own lags (VAR part),

  • a 1D exogenous signal (x_t) (input),

  • and a short MA(1) disturbance term.

Then we fit a VARX (no MA) by OLS as a baseline.

# Exogenous input: a smooth driver + an intervention step
T = 900

t = np.arange(T)

x = np.zeros((T, 1))
x[:, 0] = 0.6 * np.sin(2 * np.pi * t / 60) + 0.25 * np.sin(2 * np.pi * t / 15)

# intervention at t=600
x[t >= 600, 0] += 1.0

# VAR part (stable)
A1 = np.array([[0.55, 0.10],
               [-0.05, 0.35]])
A2 = np.array([[-0.20, 0.03],
               [0.01, -0.12]])
A = [A1, A2]

# Exogenous lags: B0 (instant), B1 (delayed)
B0 = np.array([[0.50],
               [0.10]])
B1 = np.array([[0.15],
               [0.25]])
B = [B0, B1]

# MA(1) disturbance
M1 = np.array([[0.45, 0.00],
               [0.20, 0.20]])
M = [M1]

c = np.array([0.0, 0.0])
Sigma = np.array([[0.8, 0.25],
                  [0.25, 0.6]])

# Simulate
Y, E = simulate_varmax(x=x, A_list=A, B_list=B, M_list=M, c=c, Sigma=Sigma, burnin=250, seed=SEED)
X = x[250 + max(len(A), len(B)-1, len(M)) :]

print("Y:", Y.shape, "X:", X.shape)

# Plot y and x
T_eff = Y.shape[0]
t_eff = np.arange(T_eff)

fig = make_subplots(
    rows=3,
    cols=1,
    shared_xaxes=True,
    vertical_spacing=0.05,
    subplot_titles=["y[0]", "y[1]", "x (exogenous)"]
)

fig.add_trace(go.Scatter(x=t_eff, y=Y[:, 0], mode="lines", name="y0"), row=1, col=1)
fig.add_trace(go.Scatter(x=t_eff, y=Y[:, 1], mode="lines", name="y1"), row=2, col=1)
fig.add_trace(go.Scatter(x=t_eff, y=X[:, 0], mode="lines", name="x"), row=3, col=1)

fig.update_layout(height=620, title="Simulated VARMAX outputs driven by exogenous input")
fig.show()
Y: (648, 2) X: (648, 1)

Fit a VARX baseline (ignoring MA)#

When (q>0) the residuals from a VARX fit can remain autocorrelated (because the true disturbance is MA). Still, VARX is often a useful baseline.

fit = varx_ols_fit(y=Y, x=X, p=2, r=1, add_intercept=True)
print("c_hat:", fit["c"])
print("B0_hat:")
print(fit["B"][0])
print("B1_hat:")
print(fit["B"][1])
print("Sigma_hat:")
print(fit["Sigma"])
c_hat: [ 0.0881 -0.0713]
B0_hat:
[[-0.4179]
 [ 0.0591]]
B1_hat:
[[0.8749]
 [0.2066]]
Sigma_hat:
[[0.7877 0.2877]
 [0.2877 0.6314]]

6) Multivariate forecasts with known future inputs#

Forecasting with exogenous inputs is typically scenario-based:

  • decide or forecast (x_{T+1:T+H}),

  • propagate the model forward.

Below we forecast (H) steps ahead using the fitted VARX parameters.

T0 = 520
H = 80

y_hist = Y[:T0]
x_hist = X[:T0]

y_true = Y[T0:T0+H]
x_future = X[T0:T0+H]

paths = varx_forecast_paths(
    y_hist=y_hist,
    x_hist=x_hist,
    x_future=x_future,
    c=fit['c'],
    A_list=fit['A'],
    B_list=fit['B'],
    Sigma=fit['Sigma'],
    n_paths=800,
    seed=SEED + 100,
)

q_lo, q_med, q_hi = np.quantile(paths, [0.05, 0.5, 0.95], axis=0)

x_past = np.arange(T0)
x_fut = np.arange(T0, T0 + H)

fig = make_subplots(rows=3, cols=1, shared_xaxes=True, vertical_spacing=0.05,
                    subplot_titles=["Forecast y[0]", "Forecast y[1]", "x future (given)"])

for i in range(2):
    fig.add_trace(go.Scatter(x=x_past, y=y_hist[:, i], mode="lines", name=f"y{i} history",
                             line=dict(color="rgba(0,0,0,0.55)")), row=i+1, col=1)

    fig.add_trace(go.Scatter(x=x_fut, y=y_true[:, i], mode="lines", name=f"y{i} truth",
                             line=dict(color="rgba(0,0,0,1.0)", width=2)), row=i+1, col=1)

    fig.add_trace(go.Scatter(x=x_fut, y=q_hi[:, i], mode="lines", line=dict(width=0),
                             showlegend=False, hoverinfo="skip"), row=i+1, col=1)
    fig.add_trace(go.Scatter(x=x_fut, y=q_lo[:, i], mode="lines", line=dict(width=0),
                             fill="tonexty", fillcolor="rgba(0,160,120,0.18)",
                             name=f"90% PI y{i}", hoverinfo="skip"), row=i+1, col=1)

    fig.add_trace(go.Scatter(x=x_fut, y=q_med[:, i], mode="lines", name=f"median y{i}",
                             line=dict(color="rgba(0,160,120,1.0)", dash="dash")), row=i+1, col=1)

    fig.add_vline(x=T0, line_width=1, line_dash="dot", line_color="gray")

fig.add_trace(go.Scatter(x=x_fut, y=x_future[:, 0], mode="lines", name="x given",
                         line=dict(color="rgba(120,120,120,1.0)")), row=3, col=1)

fig.update_layout(height=700, title="VARX forecasts conditional on exogenous path")
fig.show()

7) Shock propagation#

Two kinds:

  • Innovation shocks (\varepsilon_t): use impulse responses (\Psi_h) (same recursion as VARMA).

  • Input shocks (x_t): use dynamic multipliers (\Theta_h) for VARX.

Below we show:

  1. response to a unit pulse in the input at time 0 (dynamic multipliers, using the fitted VARX),

  2. response to a 1-s.d. innovation shock (impulse responses, using the simulated VARMA disturbance).

# (1) Dynamic multipliers: response to a unit pulse in x
H_mul = 25
Theta = varx_dynamic_multiplier(A_list=fit['A'], B_list=fit['B'], steps=H_mul)  # (H+1,k,m)

h = np.arange(H_mul + 1)
resp = Theta[:, :, 0]  # m=1

fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.08,
                    subplot_titles=["y[0] response to x pulse", "y[1] response to x pulse"])

fig.add_trace(go.Scatter(x=h, y=resp[:, 0], mode="lines", name="y0"), row=1, col=1)
fig.add_trace(go.Scatter(x=h, y=resp[:, 1], mode="lines", name="y1"), row=2, col=1)

fig.update_layout(height=520, title="VARX shock propagation from exogenous input (dynamic multipliers)")
fig.update_xaxes(title_text="horizon")
fig.show()


# (2) Innovation impulse responses: response to 1-s.d. shocks in eps
H_irf = 25
Psi = varma_irf(A_list=A, M_list=M, steps=H_irf)  # uses the simulated VARMAX disturbance
shock_std = np.sqrt(np.diag(Sigma))

fig = make_subplots(
    rows=2,
    cols=2,
    shared_xaxes=True,
    shared_yaxes=False,
    horizontal_spacing=0.12,
    vertical_spacing=0.10,
    subplot_titles=[
        "response y0 to shock e0",
        "response y0 to shock e1",
        "response y1 to shock e0",
        "response y1 to shock e1",
    ],
)

h = np.arange(H_irf + 1)
for shock_j in range(2):
    u = np.zeros(2)
    u[shock_j] = shock_std[shock_j]
    resp = Psi @ u  # (H+1, k)

    fig.add_trace(go.Scatter(x=h, y=resp[:, 0], mode="lines", name=f"shock {shock_j}"), row=1, col=shock_j+1)
    fig.add_trace(go.Scatter(x=h, y=resp[:, 1], mode="lines", showlegend=False), row=2, col=shock_j+1)

fig.update_layout(height=520, title="VARMAX impulse responses (1-s.d. innovations)")
fig.update_xaxes(title_text="horizon")
fig.show()

8) Exercises#

  1. Change (B_0) and (B_1) signs and interpret the dynamic multiplier shapes.

  2. Add more exogenous lags (increase (r)) and see how (\Theta_h) changes.

  3. Simulate with stronger MA(1) and compare VARX residual behavior.

References#

  • Lütkepohl, New Introduction to Multiple Time Series Analysis

  • Hamilton, Time Series Analysis

  • Tsay, Multivariate Time Series Analysis